---
title: "DataLoaderSource"
description: "Batch data loading source for processing files and external data sources with support for multiple data formats"
---

The `DataLoaderSource` class provides batch data loading capabilities for processing data from files and external sources. It supports multiple data formats and is designed for ETL pipelines and large-scale data ingestion scenarios.

## DataFormat Enum

Enumeration of supported data formats for batch loading.

```python
DataFormat(*args, **kwds)
```

### Available Formats

<ParamField path="CSV" type="DataFormat">
Comma-separated values format for tabular data.
</ParamField>

<ParamField path="JSON" type="DataFormat">
JavaScript Object Notation format for structured data.
</ParamField>

<ParamField path="PARQUET" type="DataFormat">
Apache Parquet columnar storage format for efficient data processing.
</ParamField>

<ParamField path="ORC" type="DataFormat">
Optimized Row Columnar format for big data processing.
</ParamField>

<ParamField path="XML" type="DataFormat">
eXtensible Markup Language format for structured documents.
</ParamField>

<ParamField path="FWF" type="DataFormat">
Fixed-width format for structured text data with fixed column positions.
</ParamField>

## DataLoaderConfig

Configuration object for specifying data loading parameters.

```python
DataLoaderConfig(path, format, name=None, pandas_read_kwargs=None)
```

### Parameters

<ParamField path="path" type="str" required>
The file path or URL to the data source. Can be local file paths, URLs, or cloud storage paths.
</ParamField>

<ParamField path="format" type="DataFormat" required>
The format of the data source, specified using the DataFormat enum.
</ParamField>

<ParamField path="name" type="str | None" default="None">
Optional name for the data loader configuration. Useful for identifying multiple data sources.
</ParamField>

<ParamField path="pandas_read_kwargs" type="dict[str, Any] | None" default="None">
Additional keyword arguments passed to the pandas read function (e.g., `read_csv`, `read_json`). Allows fine-tuning of data loading behavior.
</ParamField>

## DataLoaderSource

Main class for batch data loading from configured sources.

```python
DataLoaderSource(schema, data_loader_config, parser=None)
```

### Parameters

<ParamField path="schema" type="IdSchemaObjectT" required>
The schema object that defines the structure of data this source will handle. All loaded data must conform to this schema.
</ParamField>

<ParamField path="data_loader_config" type="DataLoaderConfig" required>
Configuration object specifying the data source, format, and loading parameters.
</ParamField>

<ParamField path="parser" type="DataParser | None" default="None">
Optional data parser for processing loaded data. If None, an appropriate parser is selected based on the data format.
</ParamField>

### Properties

<ParamField path="config" type="DataLoaderConfig" required>
The configuration object used for this data loader.
</ParamField>

<ParamField path="name" type="str" required>
The name identifier for this data loader source.
</ParamField>

## Inheritance

The `DataLoaderSource` extends several classes for online data processing:

**Inheritance Chain**: 
- `DataLoaderSource` 
- → `OnlineSource` 
- → `TransformerPublisher`
- → `Source` 
- → `Generic`

## Data Loading Examples

### CSV File Loading

```python
from superlinked import DataLoaderSource, DataLoaderConfig, DataFormat

# Configure CSV loading
csv_config = DataLoaderConfig(
    path="data/products.csv",
    format=DataFormat.CSV,
    name="product_data",
    pandas_read_kwargs={
        "delimiter": ",",
        "header": 0,
        "encoding": "utf-8"
    }
)

# Create data loader source
csv_source = DataLoaderSource(
    schema=product_schema,
    data_loader_config=csv_config
)
```

### JSON File Loading

```python
# Configure JSON loading
json_config = DataLoaderConfig(
    path="data/articles.json",
    format=DataFormat.JSON,
    name="article_data",
    pandas_read_kwargs={
        "orient": "records",
        "lines": True  # For JSONL format
    }
)

json_source = DataLoaderSource(
    schema=article_schema,
    data_loader_config=json_config
)
```

### Parquet File Loading

```python
# Configure Parquet loading
parquet_config = DataLoaderConfig(
    path="data/large_dataset.parquet",
    format=DataFormat.PARQUET,
    name="large_data",
    pandas_read_kwargs={
        "columns": ["id", "title", "content", "category"],
        "filters": [("category", "in", ["tech", "science"])]
    }
)

parquet_source = DataLoaderSource(
    schema=document_schema,
    data_loader_config=parquet_config
)
```

### Cloud Storage Loading

```python
# Load from cloud storage (S3, GCS, etc.)
cloud_config = DataLoaderConfig(
    path="s3://my-bucket/data/products.csv",
    format=DataFormat.CSV,
    name="cloud_products",
    pandas_read_kwargs={
        "storage_options": {
            "aws_access_key_id": "your_key",
            "aws_secret_access_key": "your_secret"
        }
    }
)

cloud_source = DataLoaderSource(
    schema=product_schema,
    data_loader_config=cloud_config
)
```

## Use Cases

### ETL Pipelines

Process large datasets from data warehouses:

```python
# Load from data warehouse export
warehouse_config = DataLoaderConfig(
    path="exports/daily_transactions.parquet",
    format=DataFormat.PARQUET,
    name="daily_transactions"
)

transaction_source = DataLoaderSource(
    schema=transaction_schema,
    data_loader_config=warehouse_config
)
```

### Batch Processing

Regular batch updates from external systems:

```python
# Daily batch processing
for date in date_range:
    daily_config = DataLoaderConfig(
        path=f"data/daily_feeds/{date}.json",
        format=DataFormat.JSON,
        name=f"feed_{date}"
    )
    
    daily_source = DataLoaderSource(
        schema=feed_schema,
        data_loader_config=daily_config
    )
```

### Data Migration

Migrate data from existing systems:

```python
# Migrate from legacy CSV exports
migration_configs = [
    DataLoaderConfig(f"legacy_data/users_{i}.csv", DataFormat.CSV)
    for i in range(1, 11)  # 10 CSV files
]

migration_sources = [
    DataLoaderSource(user_schema, config)
    for config in migration_configs
]
```

### Research and Analytics

Load research datasets for analysis:

```python
# Research dataset loading
research_config = DataLoaderConfig(
    path="research_data/scientific_papers.json",
    format=DataFormat.JSON,
    name="research_papers",
    pandas_read_kwargs={
        "chunksize": 1000  # Process in chunks for large files
    }
)

research_source = DataLoaderSource(
    schema=paper_schema,
    data_loader_config=research_config
)
```

## Performance Optimization

### Chunked Processing

For large files, use pandas chunking:

```python
large_file_config = DataLoaderConfig(
    path="very_large_dataset.csv",
    format=DataFormat.CSV,
    pandas_read_kwargs={
        "chunksize": 10000,  # Process 10k rows at a time
        "low_memory": False
    }
)
```

### Column Selection

Load only needed columns to reduce memory usage:

```python
optimized_config = DataLoaderConfig(
    path="wide_dataset.parquet",
    format=DataFormat.PARQUET,
    pandas_read_kwargs={
        "columns": ["id", "title", "content"]  # Only load needed columns
    }
)
```

### Data Filtering

Apply filters during loading to reduce data volume:

```python
filtered_config = DataLoaderConfig(
    path="large_dataset.parquet",
    format=DataFormat.PARQUET,
    pandas_read_kwargs={
        "filters": [
            ("date", ">=", "2024-01-01"),
            ("category", "in", ["important", "urgent"])
        ]
    }
)
```

## Best Practices

### File Format Selection

<Tip>
**Parquet for Analytics**: Use Parquet format for large analytical datasets due to efficient compression and columnar storage.
</Tip>

<Tip>
**JSON for APIs**: Use JSON format when loading data from API responses or when maintaining data structure is important.
</Tip>

### Memory Management

<Warning>
**Large Files**: For files larger than available RAM, use chunked processing or streaming approaches to prevent memory issues.
</Warning>

### Error Handling

<Note>
**Data Validation**: Always validate loaded data against your schema before processing to catch format mismatches early.
</Note>

### Performance Optimization

<Tip>
**Column Selection**: Only load columns you need to reduce memory usage and improve loading performance.
</Tip>

## Integration with Applications

DataLoaderSource integrates with Superlinked applications for batch processing:

```python
from superlinked import RestApp, Index, TextSimilaritySpace

# Create data loader for batch ingestion
batch_config = DataLoaderConfig(
    path="data/product_catalog.csv",
    format=DataFormat.CSV
)

batch_source = DataLoaderSource(
    schema=product_schema,
    data_loader_config=batch_config
)

# Create application with both batch and real-time sources
app = RestApp(
    sources=[batch_source, real_time_source],  # Batch + real-time
    indices=[product_index],
    queries=[search_query],
    vector_database=vector_db
)
```

The DataLoaderSource provides efficient batch data loading capabilities essential for production data processing pipelines and large-scale vector search applications.